In this project, our goal is to write a software pipeline to identify the lane boundaries in a video, but the main output or product we want you to create is a detailed writeup of the project. Check out the writeup template for this project and use it as a starting point for creating your own writeup.
The goals / steps of this project are the following:
The images for camera calibration are stored in the folder called camera_cal. The images in test_images are for testing your pipeline on single frames. If you want to extract more test images from the videos, you can simply use an image writing method like cv2.imwrite(), i.e., you can read the video in frame by frame as usual, and for frames you want to save for later you can write to an image file.
To help the reviewer examine your work, please save examples of the output from each stage of your pipeline in the folder called ouput_images, and include a description in your writeup for the project of what each image shows. The video called project_video.mp4 is the video your pipeline should work well on.
The challenge_video.mp4 video is an extra (and optional) challenge for you if you want to test your pipeline under somewhat trickier conditions. The harder_challenge.mp4 video is another optional challenge and is brutal!
# Camera Calibration
import numpy as np
import cv2
import glob
import pickle
import os
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
%matplotlib inline
WORKING_DIRECTORY = 'data/'
IMAGE_EXTENSION = '.jpg'
OUTPUT_DIRECTORY = 'output_images/'
DATACACHE_DIRECTORY = "datacache/"
# Calibration Constants #
CALIBRATION_DIRECTORY = 'camera_cal/'
CALIBRATION_PREFIX = 'corners_found'
calibration_path = "{}{}{}".format(WORKING_DIRECTORY, CALIBRATION_DIRECTORY, '*'+IMAGE_EXTENSION)
pickle_file = os.path.join(WORKING_DIRECTORY, DATACACHE_DIRECTORY, "calibration_pickle.p")
CHESSBOARD_SIZE = (9,6)
# Threshold Constants #
TEST_IMAGE_DIRECTORY = 'test_images/'
THRESHOLDING_PREFIX = 'thresholded'
COLOR_STACKED_PREFIX = 'color_stacked'
UNDISTORTED_PREFIX = 'undistorted'
TEST_IMAGES_PREFIX = 'test'
test_images_path = "{}{}{}".format(WORKING_DIRECTORY, TEST_IMAGE_DIRECTORY, TEST_IMAGES_PREFIX+'*'+IMAGE_EXTENSION)
# Perspective Transform Constants #
WARPED_PREFIX = 'warped'
M_pickle_file = os.path.join(WORKING_DIRECTORY, DATACACHE_DIRECTORY, "M_Minv_pickle.p")
# Lane Tracking Constants #
LINES_DRAWN_PREFIX = 'green_lines'
TRACKED_PREFIX = 'tracked'
# Calibrate the camera using a 9x6 checkerboard
objp = np.zeros((CHESSBOARD_SIZE[1]*CHESSBOARD_SIZE[0], 3), np.float32)
objp[:,:2] = np.mgrid[0:CHESSBOARD_SIZE[0], 0:CHESSBOARD_SIZE[1]].T.reshape(-1, 2)
# Arrays to store object points and image points from all the images
objpoints = [] # 3-Dim points in real-world space
imgpoints = [] # 2-Dim points in virtual image plane
# Load Calibration Images
calibration_images = glob.glob(calibration_path, recursive=True)
# Walk through images and search for checkerboard corners
for idx, fname in enumerate(calibration_images):
img = mpimg.imread(fname)
gray = cv2.cvtColor(img, cv2.COLOR_RGB2GRAY)
# Find the checkerboard corners
ret, corners = cv2.findChessboardCorners(gray, CHESSBOARD_SIZE, None)
# If found, add object points, image points
if ret == True:
print('Calibrating image:', fname)
imgpoints.append(corners)
objpoints.append(objp)
# Draw and display found corners
cv2.drawChessboardCorners(img, CHESSBOARD_SIZE, corners, ret)
output_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, CALIBRATION_PREFIX
,str(idx), IMAGE_EXTENSION)
print('Saving Calibrated image:', output_img_path)
if not os.path.exists(WORKING_DIRECTORY):
os.mkdir(WORKING_DIRECTORY)
elif not os.path.exists(os.path.join(WORKING_DIRECTORY, OUTPUT_DIRECTORY)):
os.mkdir(os.path.join(WORKING_DIRECTORY, OUTPUT_DIRECTORY))
cv2.imwrite(output_img_path, img)
# Load image for reference
if os.path.exists(pickle_file):
dist_pickle = pickle.load( open(pickle_file, "rb"))
else:
dist_pickle = {}
img = cv2.imread(calibration_images[0])
img_size = (img.shape[1], img.shape[0])
# Perform calibration given object points and image points
if ("mtx" in dist_pickle and "dist" in dist_pickle):
mtx = dist_pickle["mtx"]
dist = dist_pickle["dist"]
else:
ret, mtx, dist, _, _ = cv2.calibrateCamera(objpoints, imgpoints, img_size, None, None)
# Save camera calibration result data
dist_pickle = {}
dist_pickle["mtx"] = mtx
dist_pickle["dist"] = dist
if not os.path.exists(WORKING_DIRECTORY):
os.mkdir(WORKING_DIRECTORY)
elif not os.path.exists(os.path.join(WORKING_DIRECTORY, DATACACHE_DIRECTORY)):
os.mkdir(os.path.join(WORKING_DIRECTORY, DATACACHE_DIRECTORY))
pickle.dump(dist_pickle, open(pickle_file, "wb"))
import pickle
# Read in the saved objpoints and imgpoints
dist_pickle = pickle.load( open(pickle_file, "rb"))
mtx = dist_pickle["mtx"]
dist = dist_pickle["dist"]
def abs_sobel_thresh(sobel_img, sobel_kernel=9, thresh=(20, 100)):
# Take the absolute value on Sobel function
abs_sobel = np.absolute(sobel_img)
# Rescale back to 8 bit integer
scaled_sobel = np.uint8(255*abs_sobel/np.max(abs_sobel))
# Create a copy and apply the threshold
binary_output = np.zeros_like(scaled_sobel)
binary_output[(scaled_sobel >= thresh[0]) & (scaled_sobel <= thresh[1])] = 1
return binary_output
def mag_threshold(sobelx, sobely, sobel_kernel=9, mag_thresh=(0, 255)):
# Calculate the gradient magnitude
gradmag = np.sqrt(sobelx**2 + sobely**2)
# Rescale to 8 bit
scale_factor = np.max(gradmag)/255
gradmag = (gradmag/scale_factor).astype(np.uint8)
# Create a binary image of ones where threshold is met, zeros otherwise
mag_binary = np.zeros_like(gradmag)
mag_binary[(gradmag >= mag_thresh[0]) & (gradmag <= mag_thresh[1])] = 1
# Return the binary image
return mag_binary
def dir_threshold(sobelx, sobely, sobel_kernel=3, thresh=(0, np.pi/2)):
# Take the absolute value of the gradient direction,
# apply a threshold, and create a binary image result
absgraddir = np.arctan2(np.absolute(sobely), np.absolute(sobelx))
dir_binary = np.zeros_like(absgraddir)
dir_binary[(absgraddir >= thresh[0]) & (absgraddir <= thresh[1])] = 1
return dir_binary
def apply_thresholds(img, kernel_size=9, l_thresh=(115, 190), v_thresh=(190, 255), s_thresh=(150, 255), sx_thresh=(19, 100), sy_thresh=(25, 40)):
img = np.copy(img)
# Convert to HLS color space and separate the S and L channel
hls = cv2.cvtColor(img, cv2.COLOR_RGB2HLS).astype(np.float)
l_channel = hls[:,:,1]
s_channel = hls[:,:,2]
# Convert to HSV color space and separate the V channel
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV).astype(np.float)
v_channel = hsv[:,:,2]
# Threshold l-channel
l_binary = np.zeros_like(l_channel)
l_binary[(l_channel >= l_thresh[0]) & (l_channel <= l_thresh[1])] = 1
# Threshold s-channel
s_binary = np.zeros_like(s_channel)
s_binary[(s_channel >= s_thresh[0]) & (s_channel <= s_thresh[1])] = 1
# Threshold v-channel
v_binary = np.zeros_like(v_channel)
v_binary[(v_channel >= v_thresh[0]) & (v_channel <= v_thresh[1])] = 1
# Sobel Operator
sobelx = cv2.Sobel(v_channel, cv2.CV_64F, 1, 0, ksize=kernel_size)
sobely = cv2.Sobel(v_channel, cv2.CV_64F, 0, 1, ksize=kernel_size)
# Calculate Abs value and Threshold pixels after Sobel Operator
gradx_binary = abs_sobel_thresh(sobelx, sobel_kernel=15, thresh=sx_thresh)
grady_binary = abs_sobel_thresh(sobely, sobel_kernel=15, thresh=sy_thresh)
# Threshold magnitude and direction gradients
mag_binary = mag_threshold(sobelx, sobely, sobel_kernel=kernel_size, mag_thresh=(30, 100))
dir_binary = dir_threshold(sobelx, sobely, sobel_kernel=15, thresh=(0.7, 1.3))
# Stack 2 channels for color_binary
color_binary = np.dstack(( l_binary, s_binary, v_binary))
combined = np.zeros_like(s_channel)
combined[((v_binary == 1) & (s_binary == 1)) |
((gradx_binary == 1) & (v_binary == 1)) |
((mag_binary == 1) & (dir_binary == 1)) |
((l_binary == 1) & (mag_binary == 1) & (v_binary == 1)) |
((mag_binary == 1) & (v_binary == 1))] = 1
return color_binary, combined
import numpy as np
import cv2
class tracker():
def __init__(self, win_width, win_height, margin, y_m = 1, x_m = 1, smoothing_factor = 15):
# List that stores all the past (left, right) center set values used for smoothing the output
# Average over the center to try to smooth out the result
self.recent_centers = []
# Window pixel width of center values, used to count pixels inside center windows to determine curve values
self.window_width = win_width
# Window pixel height of center values, used to count pixels inside center windows to determine curve values
# Note: Breaks the image into vertical levels
self.window_height = win_height
# The pixel distance in both directions to slide left_window + right_window template for searching
self.margin = margin # "Padding" how much the window's allowed to slide around
# Meters per pixel in vertical axis
self.ym_per_pix = y_m
# Meters per pixel in horizontal units
self.xm_per_pixel = x_m
self.smooth_factor = smoothing_factor
# TODO: Maybe try to compare curvatures
# Main tracking function below - Used to find and store lane segment positions
# Note: Uses 'Window Sliding' technique - Find centroid of pixels using 1-Dim convolution in vertical axis
def find_window_centroids(self, warped):
# print('Warped window shape:', warped.shape)
window_width = self.window_width
window_height = self.window_height
margin = self.margin
window_centroids = [] # Store the (left, right) window centroid positions per level - 9 slices
window = np.ones(window_width) # Create our convolutional window template that we will use for convolutions
# 1) First Find the starting positions for the left and right lane by using np.sum
# to get the vertical image slice with the window template
# Histogram Calculation - Sum quarter bottom of image to get slice, could use a different ratio
l_sum = np.sum(warped[(3*warped.shape[0]//4):,:(warped.shape[1]//2)], axis=0)
l_center = np.argmax(np.convolve(window,l_sum))-window_width/2
r_sum = np.sum(warped[(3*warped.shape[0]//4):, (warped.shape[1]//2):], axis=0)
r_center = np.argmax(np.convolve(window,r_sum))-window_width/2+(warped.shape[1]//2)
# Add what we found for the first layer
window_centroids.append((l_center, r_center))
# Append each layer looking for max pixel locations
for level in range(1, (int)(warped.shape[0]/window_height)):
# Restart algorithm from position 1 - Convolve the window into the vertical slice of the image
# For Single channel image
image_layer = np.sum(warped[int(warped.shape[0]-(level+1)*window_height):
int(warped.shape[0]-level*window_height),:], axis=0)
conv_signal = np.convolve(window, image_layer)
# Find the best left centroid by using past left center as a reference
# Use window_width/2 as offset because convolution signal
# Reference is at right side of window, net center of window
offset = window_width//2
l_min_index = int(max(l_center+offset-margin,0))
l_max_index = int(min(l_center+offset+margin,warped.shape[1]))
l_center = np.argmax(conv_signal[l_min_index:l_max_index])+l_min_index-offset
# Find the best right centroid by using past right center as a reference
r_min_index = int(max(r_center+offset-margin,0))
r_max_index = int(min(r_center+offset+margin, warped.shape[1]))
r_center = np.argmax(conv_signal[r_min_index:r_max_index])+r_min_index-offset
# Add what we found for that layer
window_centroids.append((l_center, r_center))
self.recent_centers.append(window_centroids)
# return averaged values of the line centers, helpss to keep the markers from jumping aroud
return np.average(self.recent_centers[-self.smooth_factor:], axis = 0)
def window_mask(width, height, img_ref, center, level):
output = np.zeros_like(img_ref)
# Fill in template with ones
output[int(img_ref.shape[0]-(level+1)*height):
int(img_ref.shape[0]-level*height),
max(0, int(center-width)):min(int(center+width), img_ref.shape[1])] = 1
return output
# Define a function to scale .PNG and JPEG Files both to 0 to 1
def normalize_pixels(img):
max_pixel_value = np.max(img)
if max_pixel_value > 1.0:
img = np.copy(np.multiply(img, 1.0 / 255.0)).astype(np.float64)
return img
# Define a function to scale .PNG and JPEG Files both to 0 to 1
def denormalize_pixels(img):
max_pixel_value = np.max(img)
if max_pixel_value <= 1.0:
img = np.copy(np.multiply(img, 255.0)).astype(np.float64)
return img
# Load Test Images
test_images = glob.glob(test_images_path, recursive=True)
def pipeline(test_images):
# Load Picklefile
M_pickle_file = os.path.join(WORKING_DIRECTORY, DATACACHE_DIRECTORY, "M_Minv_pickle.p")
if os.path.exists(M_pickle_file):
M_pickle = pickle.load( open(M_pickle_file, "rb"))
else:
M_pickle = {}
# Walk through test images
for idx, filepath in enumerate(test_images):
img = mpimg.imread(filepath)
head, filename = os.path.split(filepath)
# Set M and Minv variables to load from Datacache
M_str = "M_" + filename
Minv_str = "Minv_" + filename
# Undistort the image
img = cv2.undistort(img, mtx, dist, None, mtx)
color_binary, preprocessed_img = apply_thresholds(img)
# Perspective Transforms of Image
img_size = (img.shape[1], img.shape[0])
bottom_width = .70 # Percent of bottom Trapezoidal Height Previous - 74
mid_width = .12 # Percent of middle Trapezoidal Height
height_pct = .635 # Percent of Trapezoidal Height
bottom_trim = .935 # Percent from top to bottom to avoid car's hood
# Perform Perspective Transform
if (M_str in M_pickle and Minv_str in M_pickle):
M = M_pickle[M_str]
Minv = M_pickle[Minv_str]
warped = cv2.warpPerspective(preprocessed_img, M, img_size, flags=cv2.INTER_LINEAR)
else: # Reload
# Calculate Source Points
src_pt_1 = [img.shape[1]*(.5-mid_width/2), img.shape[0]*height_pct]
src_pt_2 = [img.shape[1]*(.5+mid_width/2),img.shape[0]*height_pct]
src_pt_3 = [img.shape[1]*(.5+bottom_width/2),img.shape[0]*(bottom_trim/2+.5)]
src_pt_4 = [img.shape[1]*(.5-bottom_width/2),img.shape[0]*(bottom_trim/2+.5)]
src = np.float32([src_pt_1, src_pt_2, src_pt_3, src_pt_4])
# Calculate Destination Points
offset = img_size[0]*.20
dst_pt_1 = [offset, 0]
dst_pt_2 = [img_size[0]-offset, 0]
dst_pt_3 = [img_size[0]-offset, img_size[1]]
dst_pt_4 = [offset, img_size[1]]
dst = np.float32([dst_pt_1, dst_pt_2, dst_pt_3, dst_pt_4])
M = cv2.getPerspectiveTransform(src, dst)
Minv = cv2.getPerspectiveTransform(dst, src)
warped = cv2.warpPerspective(preprocessed_img, M, img_size, flags=cv2.INTER_LINEAR)
warped = denormalize_pixels(warped)
# Overwrite Datacache
M_pickle[M_str] = M
M_pickle[Minv_str] = Minv
# Instantiate Tracker() class for lane tracking
window_width = warped.shape[1]//25 # Previous - 25
window_height = warped.shape[0]//9 # Previous - 80
real_to_pix_x = 4./384
real_to_pix_y = 10./720
curve_centers = tracker(win_width = window_width, win_height = window_height, margin = 25,
y_m = real_to_pix_y, x_m = real_to_pix_x, smoothing_factor = 15)
# Use tracking function to find centroids for drawing lane lines
window_centroids = curve_centers.find_window_centroids(warped)
# Points used to draw all the left and right windows
l_points = np.zeros_like(warped)
r_points = np.zeros_like(warped)
# Points used to find the left and right lanes
right_x = []
left_x = []
# Go through even levels and draw the windows
for level in range(0,len(window_centroids)):
# Utilize window_mask function above to draw window areas
l_mask = window_mask(window_width, window_height, warped, window_centroids[level][0], level)
r_mask = window_mask(window_width, window_height, warped, window_centroids[level][1], level)
# Add center value found in frame to the list of lane points per left, right
left_x.append(window_centroids[level][0])
right_x.append(window_centroids[level][1])
# Add graphic points from window mask here to total pixels found
l_points[(l_points == 255) | ((l_mask == 1.))] = 255
r_points[(r_points == 255) | ((r_mask == 1.))] = 255
## Draw the results ##
# Add both left and right pixels together
template = np.array(r_points+l_points, np.uint8)
# Create a zero color channel
zero_channel = np.zeros_like(template)
# Make window pixels green
template = np.array(cv2.merge((zero_channel, template, zero_channel)), np.uint8)
warped = denormalize_pixels(warped)
# Making the original road pixels 3 color channels
warpage = np.array(cv2.merge((warped, warped, warped)), np.uint8)
# Overlay the original road image with window results
result = cv2.addWeighted(warpage, 1, template, 0.5, 0.)
## Save Transitional Images Generated and Results to Output Directory ##
undistorted_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, UNDISTORTED_PREFIX
,str(idx), IMAGE_EXTENSION)
color_stacked_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, COLOR_STACKED_PREFIX
,str(idx), IMAGE_EXTENSION)
thresholded_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, THRESHOLDING_PREFIX
,str(idx), IMAGE_EXTENSION)
warped_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, WARPED_PREFIX
,str(idx), IMAGE_EXTENSION)
lined_img_path = "{}{}{}{}{}".format(WORKING_DIRECTORY, OUTPUT_DIRECTORY, LINES_DRAWN_PREFIX
,str(idx), IMAGE_EXTENSION)
if not os.path.exists(WORKING_DIRECTORY):
os.mkdir(WORKING_DIRECTORY)
elif not os.path.exists(os.path.join(WORKING_DIRECTORY, OUTPUT_DIRECTORY)):
os.mkdir(os.path.join(WORKING_DIRECTORY, OUTPUT_DIRECTORY))
print('')
print('Saving Undistorted Result image:', undistorted_img_path)
img = cv2.cvtColor(np.copy(img), cv2.COLOR_BGR2RGB)
cv2.imwrite(undistorted_img_path, img)
print('Saving Color Stacked image:', color_stacked_img_path)
mpimg.imsave(color_stacked_img_path, color_binary)
print('Saving Thresholded image:', thresholded_img_path)
mpimg.imsave(thresholded_img_path, preprocessed_img, cmap='gray')
print('Saving Warped image:', warped_img_path)
mpimg.imsave(warped_img_path, warped, cmap='gray')
print('Saving Curved Lines Drawn on Warped image:', lined_img_path)
mpimg.imsave(lined_img_path, result)
# Plot Transitional Images for Analysis
f, ((ax0, ax1, ax2, ax3), (ax4, ax5, ax6, ax7)) = plt.subplots(2, 4, figsize=(48, 25))
f.tight_layout()
img = cv2.cvtColor(np.copy(img), cv2.COLOR_BGR2RGB)
ax0.imshow(img)
ax0.set_title('Undistorted Image_'+str(idx), fontsize=40)
ax1.imshow(color_binary)
ax1.set_title('Color Maps Thresholded', fontsize=40)
ax2.imshow(preprocessed_img, cmap='gray')
ax2.set_title('Combined Gradients', fontsize=40)
ax3.imshow(warped, cmap='gray')
ax3.set_title('Perspective Transforms', fontsize=40)
# Inverse Warpe Image
inv_warped = cv2.warpPerspective(warped, Minv, img_size, flags=cv2.INTER_LINEAR)
ax4.imshow(inv_warped, cmap='gray')
ax4.set_title('Inverted Warped Image_'+str(idx), fontsize=40)
# Calculate Histogram
warped_mid_y = warped.shape[0]//4
histogram = np.sum(warped[warped_mid_y:,:], axis=0)
ax5.plot(histogram)
ax5.set_title('Histogram of Warped', fontsize=40)
# Print Final Result
ax6.imshow(result)
ax6.set_title('Warped - Curved Lines Drawn', fontsize=40)
# Print Final Result
ax7.imshow(result, cmap='gray')
ax7.set_title('Warped - Curved Lines Drawn', fontsize=40)
# Adjust subplots
plt.subplots_adjust(left=0., right=1, top=0.99, bottom=0.)
# Save to Picklefile
pickle.dump(M_pickle , open( M_pickle_file, "wb" ) )
pipeline(test_images)